package com.isyscore.os.metadata.service.impl;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.ObjectUtil;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.isyscore.os.core.model.entity.DataFlowTask;
import com.isyscore.os.core.util.InitiallyUtils;
import com.isyscore.os.metadata.service.executor.FlowExecutor;
import com.isyscore.os.metadata.kettle.dto.FlowTaskStepLogDTO;
import com.isyscore.os.metadata.model.entity.KettleStepLog;
import com.isyscore.os.metadata.model.entity.KettleTransLog;
import com.isyscore.os.metadata.service.DataFlowTaskService;
import com.isyscore.os.metadata.service.ETLFlowTaskLogService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.utils.Lists;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Map;

/**
 * @author wuwx
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class ETLFlowTaskLogServiceImpl implements ETLFlowTaskLogService {
    private final KettleTransLogService kettleTransLogService;
    private final KettleStepLogService kettleStepLogService;
    private final DataFlowTaskService dataFlowTaskService;

    @Override
    public List<FlowTaskStepLogDTO> getTaskStepLog(String taskId) {
        //查询任务定义
        DataFlowTask dataFlowTask = dataFlowTaskService.getOne(Wrappers.lambdaQuery(DataFlowTask.class).eq(DataFlowTask::getKettleBatchId, taskId), false);
        if (ObjectUtil.isNotNull(dataFlowTask)) {
            //kettle日志跨租户查询
            List<KettleStepLog> kettleStepLogs = InitiallyUtils.markInitially(() -> kettleStepLogService.listGroup(taskId));
            List<FlowTaskStepLogDTO> flowTaskStepLogDTOS = Lists.newArrayList();
            for (KettleStepLog kettleStepLog : kettleStepLogs) {
                FlowTaskStepLogDTO flowTaskStepLogDTO = new FlowTaskStepLogDTO();
                BeanUtil.copyProperties(kettleStepLog, flowTaskStepLogDTO);
                flowTaskStepLogDTOS.add(flowTaskStepLogDTO);
            }
            return flowTaskStepLogDTOS;
        }
        return null;
    }

    @Override
    public String getTaskRawLog(String taskId) {
        if(StringUtils.isBlank(taskId)){
            return null;
        }
        DataFlowTask dataFlowTask = dataFlowTaskService.getOne(Wrappers.lambdaQuery(DataFlowTask.class).eq(DataFlowTask::getKettleBatchId, taskId), false);
        if(ObjectUtil.isNotNull(dataFlowTask)){
            //运行中，查询实时日志
            if (dataFlowTask.getStatus().equals(1)) {
                FlowExecutor executor = FlowExecutor.getExecutors().getIfPresent(taskId);
                if (ObjectUtil.isNull(executor)) {
                    return null;
                } else {
                    try {
                        return executor.getExecutionLog();
                    } catch (Exception e) {
                        log.error(e.getMessage());
                    }
                }
                //运行结束，查询历史日志
            } else {
                return InitiallyUtils.markInitially(() -> kettleTransLogService.getOne(Wrappers.lambdaQuery(KettleTransLog.class).eq(KettleTransLog::getIdBatch, taskId), false)).getLogField();
            }
        }
        return null;
    }

    @Override
    public List<Map<String, String>> getTaskStepStatus(String taskId) {
        FlowExecutor executor = FlowExecutor.getExecutors().getIfPresent(taskId);
        if (ObjectUtil.isNull(executor)) {
            return null;
        }else{
           return executor.getStepStatus();
        }
    }
}
